package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.service.Subscription;

public class DispatchWorker
  implements Runnable, Service
{
  private static final Log log = LogFactory.getLog(DispatchWorker.class);
  private static final int POLL_TIMEOUT = 250;
  private Map subscriptions = new ConcurrentHashMap(1000, 0.75F);
  private Object lock = new Object();
  private boolean active = true;
  private boolean started = false;
  private MessageContainerManager containerManager;

  public void register(MessageContainerManager mcm)
  {
    this.containerManager = mcm;
  }

  public void wakeup()
  {
    synchronized (this.lock) {
      this.active = true;
      this.lock.notifyAll();
    }
  }

  public void addActiveSubscription(BrokerClient client, Subscription sub)
  {
    if (log.isDebugEnabled()) {
      log.info("Adding subscription: " + sub + " to client: " + client);
    }
    this.subscriptions.put(sub, client);
  }

  public void removeActiveSubscription(BrokerClient client, Subscription sub)
  {
    if (log.isDebugEnabled()) {
      log.info("Removing subscription: " + sub + " from client: " + client);
    }
    this.subscriptions.remove(sub);
  }

  public void run()
  {
    while (this.started) {
      doPoll();
      boolean dispatched = false;
      try
      {
        for (Iterator iter = this.subscriptions.keySet().iterator(); iter.hasNext(); ) {
          Subscription sub = (Subscription)iter.next();
          if ((sub != null) && (sub.isReadyToDispatch()))
            dispatched = dispatchMessages(sub, dispatched);
        }
      }
      catch (JMSException jmsEx)
      {
        Iterator iter;
        log.error("Could not dispatch to Subscription: " + jmsEx, jmsEx);
      }
      if (!dispatched)
        synchronized (this.lock) {
          this.active = false;
          if ((!this.active) && (this.started))
            try {
              this.lock.wait(250L);
            }
            catch (InterruptedException e)
            {
            }
        }
    }
  }

  public void start()
  {
    this.started = true;
  }

  public void stop()
  {
    this.started = false;
  }

  protected boolean dispatchMessages(Subscription subscription, boolean dispatched)
    throws JMSException
  {
    ActiveMQMessage[] msgs = subscription.getMessagesToDispatch();
    if ((msgs != null) && (msgs.length > 0)) {
      BrokerClient client = (BrokerClient)this.subscriptions.get(subscription);
      if (client == null) {
        log.warn("Null client for subscription: " + subscription);
      }
      else {
        for (int i = 0; i < msgs.length; i++) {
          ActiveMQMessage msg = msgs[i].shallowCopy();

          if (log.isDebugEnabled()) {
            log.debug("Dispatching message: " + msg);
          }
          int[] consumerNos = new int[1];
          consumerNos[0] = subscription.getConsumerNumber();
          msg.setConsumerNos(consumerNos);
          client.dispatch(msg);
          dispatched = true;
        }
      }
    }
    return dispatched;
  }

  protected void doPoll() {
    if ((this.containerManager != null) && (this.started))
      try {
        this.containerManager.poll();
      }
      catch (JMSException e) {
        log.error("Error polling from the ContainerManager: ", e);
      }
  }
}